###
### loop over ads
### for T / C group associated with each, 
### produce individual-level series of news viewing time by hour pre/post
###

library(dplyr)
library(purrr)
library(tidyr)
library(forcats)
library(stringr)
library(readr)
library(tibble)
library(data.table)
library(parallel)
library(lubridate)
library(hms)
library(magrittr)

setDTthreads(1)
### SET WORKING DIRECTORY HERE 
path_to_archive <- "replication/"
setwd(path_to_archive)


# load ads data
ads <- readRDS("data/final_ads.rds")

dim(ads)
# [1] 286863     65

table(ads$ad_type)
# cand_house      cand_pres    cand_senate cand_statewide        outside 
#      52051          76172          65776          22935          69929 

# for timezone adjustments
load("data/dma_timezone.RData")
dma_timezone <- dma_timezone %>% 
	select(dma_code, timezone) %>%
	mutate(timezone = recode(timezone, ETZ = "America/New_York", CTZ = "America/Chicago", PTZ="America/Los_Angeles")) %>%
	filter(!is.na(dma_code)) %>%
	as.data.table

get_time_in_hour_window <- function(hour, view_beg_time, view_end_time, ad_time) {
	window_start <- ad_time + dhours(hour)
	window_end <- ad_time + dhours(hour+1)

	(pmin(view_end_time, window_end) - pmax(view_beg_time, window_start)) %>% 
		time_length(unit="second") %>% 
		pmax(0) %>%
		sum(na.rm=T)

}


compute_pre_post_hourly <- function(d) {

	cat(as.character(d), "\n")
	cat("\tLoading T/C group definition...\n")
	t_c_by_ad <- readRDS(paste0("data/dd/tc_groups_", as.character(d), ".rds")) %>%
		mutate(n_t = map_dbl(t_c, ~ .[group=="T", .N]),
			   n_c1 = map_dbl(t_c, ~ .[group=="C1", .N]),
			   n_c2 = map_dbl(t_c, ~ .[group=="C2", .N]),
			   n_c12 = map_dbl(t_c, ~ .[group=="C1+C2", .N])) %>%
		filter(n_t > 0, (n_c1 + n_c2 + n_c12 > 0))


	cat("\tReading news viewing intervals...\n")	
	news_view_yday <- readRDS(paste0("data/news_intervals/news_intervals_", as.character(d-1), ".rds"))
	news_view_tday <- readRDS(paste0("data/news_intervals/news_intervals_", as.character(d), ".rds"))
	news_view_tmw <- readRDS(paste0("data/news_intervals/news_intervals_", as.character(d+1), ".rds"))

	news_view <- rbindlist(list(news_view_yday,news_view_tday,news_view_tmw)) %>% 
		.[,.(device_id, event_time_utc, event_time_utc_end)] %>%
		setkey(device_id)

	cat("\tMain ads loop...")

	dev_stack <- t_c_by_ad %>% 
		unnest(cols=c(t_c)) %>% 
		rename(ad_time_utc=event_time_utc) %>% 
		as.data.table %>%
		setkey(device_id)

	expand_view <- news_view[dev_stack, nomatch=0, allow.cartesian=T]


	setkey(expand_view, dma_code, channel, affiliate, program, ad_time_utc, n_t, n_c1, n_c2, n_c12, group, device_id)
	
	view_hourly <- expand_view[, 
		j=map(-24:23,
			   get_time_in_hour_window,
			   view_beg_time=event_time_utc,
			   view_end_time=event_time_utc_end,
			   ad_time=ad_time_utc) %>% 
		  set_names(c(paste("s_pre_", rev(1:24), sep=""), paste("s_post_", 1:24, sep=""))),
		by =.(dma_code, channel, affiliate, program, ad_time_utc, n_t, n_c1, n_c2, n_c12, group)]

	# fill implicit missings (when no viewership of some channel for some group)
	# this produces all combos of ad id, T/C group
	all_ads <- view_hourly[,.(dma_code, channel, affiliate, program, ad_time_utc, n_t, n_c1, n_c2, n_c12)] %>% 
		unique %>%
		.[,
		  .(group=c("C1","C2","C1+C2","T")),
		  by=.(dma_code, channel, affiliate, program, ad_time_utc, n_t, n_c1, n_c2, n_c12)]

	# this does the expansion and fill steps
	view_hourly[all_ads, on = c("dma_code", "channel", "affiliate", "program", "ad_time_utc", "n_t", "n_c1", "n_c2", "n_c12", "group")] %>%
		.[is.na(s_pre_24), (match("s_pre_24", colnames(view_hourly)):match("s_post_24", colnames(view_hourly))) := 0]

}


dates <- seq(mdy("09/01/2012"), mdy("11/06/2012"), by = "days")
mclapply(dates, FUN=compute_pre_post_hourly, mc.cores=20) %>% 
	rbindlist %>%
	saveRDS("data/all_dd_data.rds")
